From: Jeroen van der Heijden Date: Wed, 29 Jul 2020 12:26:36 +0000 (+0200) Subject: Fix empty tags synchronization X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~3^2~6^2~6 X-Git-Url: https://dgit.raspbian.org/%22http://www.example.com/cgi/%22/%22http:/www.example.com/cgi/%22?a=commitdiff_plain;h=27edacce0fde5a2e2b275a947b212e084686580a;p=siridb-server.git Fix empty tags synchronization --- diff --git a/include/siri/db/tags.h b/include/siri/db/tags.h index 95c5b879..9a24c6c6 100644 --- a/include/siri/db/tags.h +++ b/include/siri/db/tags.h @@ -49,6 +49,7 @@ void siridb_tags_save(siridb_tags_t * tags); void siridb_tags_init_nseries(siridb_tags_t * tags); sirinet_pkg_t * siridb_tags_pkg(siridb_tags_t * tags, uint16_t pid); sirinet_pkg_t * siridb_tags_series(siridb_series_t * series); +sirinet_pkg_t * siridb_tags_empty(siridb_tags_t * tags); #define siridb_tags_set_require_save(__tags, __tag) \ diff --git a/include/siri/net/protocol.h b/include/siri/net/protocol.h index 7efb3374..d3787e84 100644 --- a/include/siri/net/protocol.h +++ b/include/siri/net/protocol.h @@ -81,6 +81,7 @@ typedef enum BPROTO_DROP_DATABASE, /* empty */ BPROTO_REQ_TAGS, /* empty */ BPROTO_SERIES_TAGS, /* [series name, tag name, ...] */ + BPROTO_EMPTY_TAGS, /* [tag name, tag name, ...] */ } bproto_client_t; /* @@ -131,7 +132,8 @@ typedef enum BPROTO_ACK_TEE_PIPE_NAME, /* empty */ BPROTO_ACK_DROP_DATABASE, /* empty */ BPROTO_RES_TAGS, /* [[name, series], ...] */ - BPROTO_ACK_SERIES_TAGS /* empty */ + BPROTO_ACK_SERIES_TAGS, /* empty */ + BPROTO_ACK_EMPTY_TAGS, /* empty */ } bproto_server_t; #define sirinet_protocol_is_error(tp) (tp >= 64 && tp < 192) diff --git a/itest/test_tags.py b/itest/test_tags.py index 6b871a43..f1d86dcc 100644 --- a/itest/test_tags.py +++ b/itest/test_tags.py @@ -113,6 +113,12 @@ class TestTags(TestBase): self.assertEqual( res, {"success_msg": "Successfully tagged 13 series."}) + res = await self.client0.query(''' + alter series /empty/ tag `EMPTY` + ''') + self.assertEqual( + res, {"success_msg": "Successfully tagged 0 series."}) + await asyncio.sleep(3.0) res = await self.client0.query(''' @@ -164,12 +170,6 @@ class TestTags(TestBase): self.assertEqual( res, {"success_msg": "Successfully tagged 3 series."}) - res = await self.client0.query(''' - alter series /empty/ tag `EMPTY` - ''') - self.assertEqual( - res, {"success_msg": "Successfully tagged 0 series."}) - await self.client0.query(''' alter series 'variance', 'pvariance' untag `OTHER` ''') diff --git a/src/siri/db/groups.c b/src/siri/db/groups.c index 5a1a0311..274667c5 100644 --- a/src/siri/db/groups.c +++ b/src/siri/db/groups.c @@ -429,8 +429,14 @@ static void GROUPS_loop(void * arg) { sleep(GROUPS_LOOP_SLEEP); - if (siridb_is_reindexing(siridb) && (++mod_test % GROUPS_LOOP_DEEP)) + if (groups->status == GROUPS_STOPPING) + break; + + if ((siridb_is_reindexing(siridb) || + siridb_server_self_synchronizing(siridb->server)) && + (++mod_test % GROUPS_LOOP_DEEP)) { + /* less frequently when re-indexing or synchronizing */ continue; } diff --git a/src/siri/db/initsync.c b/src/siri/db/initsync.c index d8d5702e..3d7f3ca4 100644 --- a/src/siri/db/initsync.c +++ b/src/siri/db/initsync.c @@ -223,6 +223,28 @@ void siridb_initsync_fopen(siridb_initsync_t * initsync, const char * opentype) } } +/* + * Call-back function: sirinet_promise_cb + */ +static void INITSYNC_on_empty_tags_response( + sirinet_promise_t * promise, + sirinet_pkg_t * pkg, + int status) +{ + if (status) + { + log_error("Error while sending empty tags (%d)", status); + } + else if (sirinet_protocol_is_error(pkg->tp)) + { + log_error( + "Error occurred while processing data on the new server: " + "(response type: %u)", pkg->tp); + } + + sirinet_promise_decref(promise); +} + /* * Read the next series id and truncate the synchronization file to remove * the last synchronization id. @@ -271,6 +293,24 @@ static void INITSYNC_next_series_id(siridb_t * siridb) } else { + sirinet_pkg_t * pkg; + + /* send empty tags if required */ + pkg = siridb_tags_empty(siridb->tags); + if (pkg) + { + if (siridb_server_send_pkg( + siridb->replica, + pkg, + INITSYNC_TIMEOUT, + (sirinet_promise_cb) INITSYNC_on_empty_tags_response, + NULL, + 0)) + { + free(pkg); + } + } + log_info("Finished initial replica synchronization"); INITSYNC_unlink(initsync); siridb_initsync_free(&siridb->replicate->initsync); diff --git a/src/siri/db/reindex.c b/src/siri/db/reindex.c index 9906ea50..3370df91 100644 --- a/src/siri/db/reindex.c +++ b/src/siri/db/reindex.c @@ -400,6 +400,28 @@ static int REINDEX_next_series_id(siridb_reindex_t * reindex) return rc; } +/* + * Call-back function: sirinet_promise_cb + */ +static void REINDEX_on_empty_tags_response( + sirinet_promise_t * promise, + sirinet_pkg_t * pkg, + int status) +{ + if (status) + { + log_error("Error while sending empty tags (%d)", status); + } + else if (sirinet_protocol_is_error(pkg->tp)) + { + log_error( + "Error occurred while processing data on the new server: " + "(response type: %u)", pkg->tp); + } + + sirinet_promise_decref(promise); +} + /* * This function can raise a SIGNAL */ @@ -416,6 +438,25 @@ static void REINDEX_next(siridb_t * siridb) break; case NEXT_SERIES_END: + { + sirinet_pkg_t * pkg; + + /* send empty tags if required */ + pkg = siridb_tags_empty(siridb->tags); + if (pkg) + { + if (siridb_server_send_pkg( + siridb->reindex->server, + pkg, + REINDEX_TIMEOUT, + (sirinet_promise_cb) REINDEX_on_empty_tags_response, + NULL, + 0)) + { + free(pkg); + } + } + /* update and send the flags */ siridb->server->flags &= ~SERVER_FLAG_REINDEXING; siridb_servers_send_flags(siridb->servers); @@ -431,7 +472,7 @@ static void REINDEX_next(siridb_t * siridb) siri_optimize_continue(); break; - + } case NEXT_SERIES_ERR: break; /* signal is raised */ } diff --git a/src/siri/db/tags.c b/src/siri/db/tags.c index 91b1c6a9..b36fd8c4 100644 --- a/src/siri/db/tags.c +++ b/src/siri/db/tags.c @@ -249,7 +249,7 @@ static int TAGS_series_pkg(siridb_tag_t * tag, TAGS_series_t * w) /* * Main thread. * - * Returns NULL and raises a signal in case of an error. + * Returns NULL in case of an error or no tags. */ sirinet_pkg_t * siridb_tags_series(siridb_series_t * series) { @@ -277,6 +277,42 @@ sirinet_pkg_t * siridb_tags_series(siridb_series_t * series) return sirinet_packer2pkg(w.packer, 0, BPROTO_SERIES_TAGS); } +/* + * Main thread. + */ +static int TAGS_empty_tag_pkg(siridb_tag_t * tag, qp_packer_t * packer) +{ + return tag->series->len == 0 + ? qp_add_string(packer, tag->name) == 0 + : 0; +} + +/* + * Main thread. + * + * Returns NULL in case of an error or no empty tags. + */ +sirinet_pkg_t * siridb_tags_empty(siridb_tags_t * tags) +{ + qp_packer_t * packer = sirinet_packer_new(1024); + + if (packer == NULL || qp_add_type(packer, QP_ARRAY_OPEN)) + { + return NULL; + } + + if (ct_values( + tags->tags, + (ct_val_cb) TAGS_empty_tag_pkg, + packer) == 0) + { + free(packer); + return NULL; + } + + return sirinet_packer2pkg(packer, 0, BPROTO_EMPTY_TAGS); +} + /* * Main thread. */ diff --git a/src/siri/net/bserver.c b/src/siri/net/bserver.c index 0b6fca95..1a7744a4 100644 --- a/src/siri/net/bserver.c +++ b/src/siri/net/bserver.c @@ -65,6 +65,7 @@ static void on_disable_backup_mode( sirinet_pkg_t * pkg); static void on_req_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg); static void on_series_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg); +static void on_empty_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg); static uv_loop_t * loop = NULL; static struct sockaddr_storage server_addr; @@ -291,6 +292,9 @@ static void on_data(sirinet_stream_t * client, sirinet_pkg_t * pkg) case BPROTO_SERIES_TAGS: on_series_tags(client, pkg); break; + case BPROTO_EMPTY_TAGS: + on_empty_tags(client, pkg); + break; } } @@ -940,3 +944,72 @@ static void on_series_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg) sirinet_pkg_send(client, package); } } + +static void on_empty_tags(sirinet_stream_t * client, sirinet_pkg_t * pkg) +{ + SERVER_CHECK_AUTHENTICATED(client, server) + + sirinet_pkg_t * package = NULL; + siridb_t * siridb = client->siridb; + + LOGC("on empty tags..."); + + if (~siridb->server->flags & SERVER_FLAG_RUNNING) + { + log_error("Cannot tag series because of having status %d", + siridb->server->flags); + + package = sirinet_pkg_new( + pkg->pid, + 0, + BPROTO_ERR_DROP_SERIES, + NULL); + } + else + { + qp_unpacker_t unpacker; + qp_unpacker_init(&unpacker, pkg->data, pkg->len); + + if (qp_is_array(qp_next(&unpacker, NULL))) + { + qp_obj_t qp_tag_name; + + uv_mutex_lock(&siridb->tags->mutex); + + while (qp_next(&unpacker, &qp_tag_name) == QP_RAW) + { + siridb_tag_t * tag = ct_getn( + siridb->tags->tags, + qp_tag_name.via.str, + qp_tag_name.len); + + if (tag == NULL) + { + tag = siridb_tags_add_n( + siridb->tags, + qp_tag_name.via.str, + qp_tag_name.len); + + siridb_tags_set_require_save(siridb->tags, tag); + } + } + + uv_mutex_unlock(&siridb->tags->mutex); + + package = sirinet_pkg_new( + pkg->pid, + 0, + BPROTO_ACK_EMPTY_TAGS, + NULL); + } + else + { + log_error("Illegal back-end empty tags package received"); + } + } + + if (package != NULL) + { + sirinet_pkg_send(client, package); + } +} diff --git a/src/siri/net/protocol.c b/src/siri/net/protocol.c index f07d2508..fc90cd98 100644 --- a/src/siri/net/protocol.c +++ b/src/siri/net/protocol.c @@ -88,6 +88,7 @@ const char * sirinet_bproto_client_str(bproto_client_t n) case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE"; case BPROTO_REQ_TAGS: return "BPROTO_REQ_TAGS"; case BPROTO_SERIES_TAGS: return "BPROTO_SERIES_TAGS"; + case BPROTO_EMPTY_TAGS: return "BPROTO_EMPTY_TAGS"; default: sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n); return protocol_str; @@ -127,6 +128,7 @@ const char * sirinet_bproto_server_str(bproto_server_t n) case BPROTO_ACK_DROP_DATABASE: return "BPROTO_ACK_DROP_DATABASE"; case BPROTO_RES_TAGS: return "BPROTO_RES_TAGS"; case BPROTO_ACK_SERIES_TAGS: return "BPROTO_ACK_SERIES_TAGS"; + case BPROTO_ACK_EMPTY_TAGS: return "BPROTO_ACK_EMPTY_TAGS"; default: sprintf(protocol_str, "BPROTO_SERVER_TYPE_UNKNOWN (%d)", n); return protocol_str;